Skip to content

fix(cdk): fix state emit for ConcurrentPerPartitionCursor#539

Merged
Serhii Lazebnyi (lazebnyi) merged 2 commits intomainfrom
lazebnyi/fix-state-emit-for-concurrent-per-partition-cursor
May 9, 2025
Merged

fix(cdk): fix state emit for ConcurrentPerPartitionCursor#539
Serhii Lazebnyi (lazebnyi) merged 2 commits intomainfrom
lazebnyi/fix-state-emit-for-concurrent-per-partition-cursor

Conversation

@lazebnyi
Copy link
Copy Markdown
Contributor

@lazebnyi Serhii Lazebnyi (lazebnyi) commented May 9, 2025

What

If the parent stream of a per-partition stream commits states, we may emit the state more than once at the end of the sync for a global cursor.

How

Added a condition in ConcurrentPerPartitionCursor to avoid emitting the state when the global cursor is enabled, there's no parent_stream in the state, and throttling is applied.

Summary by CodeRabbit

  • Bug Fixes

    • Improved incremental sync behavior to prevent unnecessary state message emissions when using a global cursor with no parent state and throttling enabled.
  • Tests

    • Added new test scenarios to validate incremental sync with a global cursor and no incremental dependency.
    • Enhanced test utilities to verify the number of emitted state messages during sync.
  • Style

    • Removed an extraneous blank line to improve code readability.

@lazebnyi
Copy link
Copy Markdown
Contributor Author

/autofix

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented May 9, 2025

📝 Walkthrough

Walkthrough

The updates introduce a conditional check in the concurrent partition cursor logic to skip emitting state messages under specific global cursor and throttling conditions. A redundant blank line is removed from a factory method. Unit tests are expanded to cover scenarios with global cursors and no incremental dependencies, including assertions on emitted state message counts.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py Added a conditional in _emit_state_message to skip emitting state messages when using a global cursor with throttling and no parent state.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Removed an extraneous blank line after stream state migration application in a factory method.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py Added a new manifest for global cursor/no dependency, extended run_mocked_test with a state_count parameter, added a new test for global cursor scenarios, and updated parameterized tests to support state message count assertions.

Sequence Diagram(s)

sequenceDiagram
    participant TestRunner
    participant MockAPI
    participant ConcurrentPartitionCursor

    TestRunner->>ConcurrentPartitionCursor: Initiate sync with global cursor and no dependency
    loop For each partition
        ConcurrentPartitionCursor->>MockAPI: Fetch partition data (posts, comments, votes)
        MockAPI-->>ConcurrentPartitionCursor: Return paginated responses
    end
    alt Throttling enabled & using global cursor & parent state empty
        ConcurrentPartitionCursor-->>TestRunner: Skip emitting state message
    else
        ConcurrentPartitionCursor-->>TestRunner: Emit state message
    end
    TestRunner->>TestRunner: Assert number of emitted state messages
Loading

Would you like me to create a diagram comparing the old and new state message emission flows as well, or is this overview sufficient for your needs? Wdyt?

Suggested reviewers

  • lazebnyi
  • tolik0

Tip

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 40ce8b1 and 0b37544.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (6)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (6)

312-324: LGTM - New test manifest looks good

The new manifest variant SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY properly extends the existing test setup to cover the specific case addressed in the PR. You've disabled incremental dependency and enabled the global cursor flag exactly as needed to test the fix.


338-338: Good addition of state_count parameter

Adding the optional state_count parameter to run_mocked_test allows you to verify that the state emission behavior works correctly. This is critical for testing the fix described in the PR.


378-379: Nice assertion to validate state message count

This assertion is key to the PR fix - it verifies that the number of emitted state messages matches the expected count, which helps confirm that duplicate states aren't being emitted.


457-458: Good parameter update

Correctly updated the parametrization to include the new state_count parameter.


768-1020: Well-structured test case to validate the fix

This new test case test_incremental_parent_state_with is comprehensive and exactly covers the scenario from the PR description. It uses the global cursor configuration and verifies that only one state message is emitted.

I particularly like:

  1. The detailed mock requests setup that simulates the full data flow
  2. Setting the expected state count to 1, which verifies that duplicate emissions are prevented
  3. The thorough expected records and state validation

1024-1025: Good function signature and parameter update

Properly updated the function signature and parameter passing to accommodate the new state count validation.

Also applies to: 1046-1047

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

245-247: The conditional skip looks good for preventing redundant emissions.

This addition intelligently prevents double-emitting state messages when using global cursors without a parent state. This should effectively fix the issue described in the PR where state messages were emitted multiple times at the end of synchronization.

Are you confident this won't affect any other scenarios where we need state emission? Perhaps we should add a debug log here to help with troubleshooting if someone encounters issues, wdyt?

if self._use_global_cursor and not self._parent_state:
+    logger.debug(f"Skipping state emission for global cursor with no parent state for stream {self._stream_name}")
    return
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bcfcf04 and 40ce8b1.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (0 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5 hunks)
💤 Files with no reviewable changes (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py (1)
  • test_incremental_parent_state_no_incremental_dependency (1491-1534)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (7)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (7)

312-321: Good addition of test manifests for global cursor scenarios!

This new manifest variant properly configures a global cursor with disabled incremental dependencies, which is essential for testing the fix for duplicate state emission with global cursors. Great job covering this case explicitly.


329-329: Smart extension to the test helper function!

Adding the optional state_count parameter allows you to verify the exact number of state messages emitted, which directly addresses the core issue being fixed. This is a clean way to validate the behavior without invasive changes.


369-369: Good assertion for state message count validation.

This new assertion ensures the correct number of state messages are emitted when the parameter is provided. It gracefully handles cases where state count isn't important by using a default True assertion.


448-449: Properly updated parameter lists to support state count validation.

You've consistently updated both the parameterized test definition and the function signature to include the new state_count parameter. This maintains the pattern used throughout the file.

Also applies to: 1015-1016


759-1010: Excellent test case for global cursor with no incremental dependency!

This comprehensive test verifies that when using a global cursor with no incremental dependency, only a single state message is emitted (state_count=1), directly addressing the issue described in the PR. The test data setup is thorough and validates the specific fix implemented.


1037-1037: Correctly passes state_count to the test helper.

The state_count parameter is properly passed to run_mocked_test, ensuring validation happens as expected for each test case.


1-1:

Details

✅ Verification successful

Ruff formatting issue needs to be fixed.

The pipeline shows a Ruff formatting issue. Would you mind running 'ruff format' on this file before merge? This will ensure consistent code style across the codebase.


🏁 Script executed:

#!/bin/bash
ruff format unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Length of output: 113


Formatting issues addressed

I ran ruff format on this file and it was reformatted successfully. Everything looks consistent now—ready to merge? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

@lazebnyi
Copy link
Copy Markdown
Contributor Author

Serhii Lazebnyi (lazebnyi) commented May 9, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! :shipit:

@lazebnyi Serhii Lazebnyi (lazebnyi) merged commit d7ebfd9 into main May 9, 2025
26 of 28 checks passed
@lazebnyi Serhii Lazebnyi (lazebnyi) deleted the lazebnyi/fix-state-emit-for-concurrent-per-partition-cursor branch May 9, 2025 22:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants